from pyspark.sql import functions as F
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql.window import Window
from pyspark.conf import SparkConf
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("kiwisolver==1.2.0")
sc.install_pypi_package("matplotlib==3.1.1")
sc.install_pypi_package("chart_studio")
sc.install_pypi_package("iso3166")
sc.install_pypi_package("python-dateutil")
Collecting pandas==0.25.1 Downloading https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl (10.4MB) Requirement already satisfied: pytz>=2017.2 in /usr/local/lib/python3.7/site-packages (from pandas==0.25.1) Requirement already satisfied: numpy>=1.13.3 in /usr/local/lib64/python3.7/site-packages (from pandas==0.25.1) Collecting python-dateutil>=2.6.1 (from pandas==0.25.1) Downloading https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl (247kB) Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.7/site-packages (from python-dateutil>=2.6.1->pandas==0.25.1) Installing collected packages: python-dateutil, pandas Successfully installed pandas-0.25.1 python-dateutil-2.8.2 Collecting kiwisolver==1.2.0 Downloading https://files.pythonhosted.org/packages/31/b9/6202dcae729998a0ade30e80ac00f616542ef445b088ec970d407dfd41c0/kiwisolver-1.2.0-cp37-cp37m-manylinux1_x86_64.whl (88kB) Installing collected packages: kiwisolver Successfully installed kiwisolver-1.2.0 Collecting matplotlib==3.1.1 Downloading https://files.pythonhosted.org/packages/19/7a/60bd79c5d79559150f8bba866dd7d434f0a170312e4d15e8aefa5faba294/matplotlib-3.1.1-cp37-cp37m-manylinux1_x86_64.whl (13.1MB) Requirement already satisfied: python-dateutil>=2.1 in /mnt/tmp/1636805823672-0/lib/python3.7/site-packages (from matplotlib==3.1.1) Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 (from matplotlib==3.1.1) Downloading https://files.pythonhosted.org/packages/a0/34/895006117f6fce0b4de045c87e154ee4a20c68ec0a4c9a36d900888fb6bc/pyparsing-3.0.6-py3-none-any.whl (97kB) Collecting cycler>=0.10 (from matplotlib==3.1.1) Downloading https://files.pythonhosted.org/packages/5c/f9/695d6bedebd747e5eb0fe8fad57b72fdf25411273a39791cde838d5a8f51/cycler-0.11.0-py3-none-any.whl Requirement already satisfied: numpy>=1.11 in /usr/local/lib64/python3.7/site-packages (from matplotlib==3.1.1) Requirement already satisfied: kiwisolver>=1.0.1 in /mnt/tmp/1636805823672-0/lib/python3.7/site-packages (from matplotlib==3.1.1) Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.7/site-packages (from python-dateutil>=2.1->matplotlib==3.1.1) Installing collected packages: pyparsing, cycler, matplotlib Successfully installed cycler-0.11.0 matplotlib-3.1.1 pyparsing-3.0.6 Collecting chart_studio Downloading https://files.pythonhosted.org/packages/ca/ce/330794a6b6ca4b9182c38fc69dd2a9cbff60fd49421cb8648ee5fee352dc/chart_studio-1.1.0-py3-none-any.whl (64kB) Collecting plotly (from chart_studio) Downloading https://files.pythonhosted.org/packages/68/b2/7c7d111affebb281b1085680db7358f6204920bb5574e9e84db05c530688/plotly-5.3.1-py2.py3-none-any.whl (23.9MB) Requirement already satisfied: six in /usr/local/lib/python3.7/site-packages (from chart_studio) Collecting requests (from chart_studio) Downloading https://files.pythonhosted.org/packages/92/96/144f70b972a9c0eabbd4391ef93ccd49d0f2747f4f6a2a2738e99e5adc65/requests-2.26.0-py2.py3-none-any.whl (62kB) Collecting retrying>=1.3.3 (from chart_studio) Downloading https://files.pythonhosted.org/packages/44/ef/beae4b4ef80902f22e3af073397f079c96969c69b2c7d52a57ea9ae61c9d/retrying-1.3.3.tar.gz Collecting tenacity>=6.2.0 (from plotly->chart_studio) Downloading https://files.pythonhosted.org/packages/f2/a5/f86bc8d67c979020438c8559cc70cfe3a1643fd160d35e09c9cca6a09189/tenacity-8.0.1-py3-none-any.whl Collecting idna<4,>=2.5; python_version >= "3" (from requests->chart_studio) Downloading https://files.pythonhosted.org/packages/04/a2/d918dcd22354d8958fe113e1a3630137e0fc8b44859ade3063982eacd2a4/idna-3.3-py3-none-any.whl (61kB) Collecting urllib3<1.27,>=1.21.1 (from requests->chart_studio) Downloading https://files.pythonhosted.org/packages/af/f4/524415c0744552cce7d8bf3669af78e8a069514405ea4fcbd0cc44733744/urllib3-1.26.7-py2.py3-none-any.whl (138kB) Collecting charset-normalizer~=2.0.0; python_version >= "3" (from requests->chart_studio) Downloading https://files.pythonhosted.org/packages/de/c8/820b1546c68efcbbe3c1b10dd925fbd84a0dda7438bc18db0ef1fa567733/charset_normalizer-2.0.7-py3-none-any.whl Collecting certifi>=2017.4.17 (from requests->chart_studio) Downloading https://files.pythonhosted.org/packages/37/45/946c02767aabb873146011e665728b680884cd8fe70dde973c640e45b775/certifi-2021.10.8-py2.py3-none-any.whl (149kB) Building wheels for collected packages: retrying Running setup.py bdist_wheel for retrying: started Running setup.py bdist_wheel for retrying: finished with status 'done' Stored in directory: /var/lib/livy/.cache/pip/wheels/d7/a9/33/acc7b709e2a35caa7d4cae442f6fe6fbf2c43f80823d46460c Successfully built retrying Installing collected packages: tenacity, plotly, idna, urllib3, charset-normalizer, certifi, requests, retrying, chart-studio Successfully installed certifi-2021.10.8 charset-normalizer-2.0.7 chart-studio-1.1.0 idna-3.3 plotly-5.3.1 requests-2.26.0 retrying-1.3.3 tenacity-8.0.1 urllib3-1.26.7 Collecting iso3166 Downloading https://files.pythonhosted.org/packages/32/d8/1fc7d9f5b11b4490c8a9ac9fd79efb466e142468428b71dea42d5756e257/iso3166-2.0.2-py3-none-any.whl Installing collected packages: iso3166 Successfully installed iso3166-2.0.2 Requirement already satisfied: python-dateutil in /mnt/tmp/1636805823672-0/lib/python3.7/site-packages Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.7/site-packages (from python-dateutil) You are using pip version 9.0.1, however version 21.3.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command. You are using pip version 9.0.1, however version 21.3.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command. You are using pip version 9.0.1, however version 21.3.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command. You are using pip version 9.0.1, however version 21.3.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command. You are using pip version 9.0.1, however version 21.3.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command. You are using pip version 9.0.1, however version 21.3.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command.
from datetime import datetime
from dateutil import parser
import matplotlib.pyplot as plt
import numpy as np
An error was encountered: No module named 'matplotlib' Traceback (most recent call last): ModuleNotFoundError: No module named 'matplotlib'
OpenAQ dataset with air pollution data from a few days will be used.
smog_df = spark.read.json("s3a://openaq-fetches/realtime/2021-11-0[1-5]/*")
smog_df.printSchema()
root |-- attribution: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- url: string (nullable = true) |-- averagingPeriod: struct (nullable = true) | |-- unit: string (nullable = true) | |-- value: double (nullable = true) |-- city: string (nullable = true) |-- coordinates: struct (nullable = true) | |-- latitude: double (nullable = true) | |-- longitude: double (nullable = true) |-- country: string (nullable = true) |-- date: struct (nullable = true) | |-- local: string (nullable = true) | |-- utc: string (nullable = true) |-- location: string (nullable = true) |-- mobile: boolean (nullable = true) |-- parameter: string (nullable = true) |-- sourceName: string (nullable = true) |-- sourceType: string (nullable = true) |-- unit: string (nullable = true) |-- value: double (nullable = true)
smog_df.show()
+--------------------+---------------+-----+--------------------+-------+--------------------+--------------------+------+---------+--------------+----------+-----+------+
| attribution|averagingPeriod| city| coordinates|country| date| location|mobile|parameter| sourceName|sourceType| unit| value|
+--------------------+---------------+-----+--------------------+-------+--------------------+--------------------+------+---------+--------------+----------+-----+------+
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-15T20:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-15T21:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-15T22:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-15T23:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T00:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T01:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T02:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T03:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T04:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T05:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T06:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T07:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T08:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T09:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T10:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T11:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T12:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T13:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T14:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
|[{EPA AirNow DOS,...| {hours, 1.0}|Kabul|{34.535812, 69.19...| AF|{2021-08-16T15:00...|US Diplomatic Pos...| false| pm25|StateAir_Kabul|government|µg/m³|-999.0|
+--------------------+---------------+-----+--------------------+-------+--------------------+--------------------+------+---------+--------------+----------+-----+------+
only showing top 20 rows
smog_5_days_df = smog_df.where(
(F.col("date.utc") >= F.lit("2021-11-01T00:00:000Z")) &
(F.col("date.utc") < F.lit("2021-11-06T00:00:000Z"))
)
Find cities in Poland, where PM10 pollution was lower than maximal PM10 pollution in Berlin (in the date range used). Perform calculations for data from a few days. Sort results descending by PM10 concentration.
berlin_max = (
smog_5_days_df
.where((F.col("city") == "Berlin") &
(F.col("parameter") == "pm10"))
.select(F.max(F.col("value")))
.first()
)[0]
berlin_max
34.75
cities = (
smog_5_days_df
.select("city", "value")
.where(
(F.col("country") == "PL") &
(F.col("parameter") == "pm10")
)
.groupBy("city")
.agg(F.avg("value").alias("avg_value"))
.where(F.col("avg_value") < berlin_max)
.orderBy(F.desc("avg_value"))
)
cities.show()
+---------------+------------------+ | city| avg_value| +---------------+------------------+ | Zamość|33.808115942028984| | Mosina| 32.14942263768117| | Jarosław| 31.98960950724638| | Przemyśl| 30.92014379710145| | Lublin|30.859649122807017| | Chełm|30.570238095238096| | Zabierzów|30.169969639639636| | Biała Podlaska| 29.58787610619468| |Sucha Beskidzka| 29.2139244057971| | Pabianice| 29.16374269005848| | Zdzieszowice| 29.14047282674772| | Lębork|28.728237982456143| | Rzeszów|28.511712763157895| | Guty Duże|28.377553956834536| | Zgierz| 28.3574144486692| | Bydgoszcz| 27.98038632986627| | Nowa Ruda|27.977136842105267| | Kraków|27.783044329670325| | Nowy Sącz|27.642063947368417| | Sierpc|27.629854651162795| +---------------+------------------+ only showing top 20 rows
For each hour (in UTC time) calculate the highest PM2.5 pollution in selected cities. Sort the results ascending by date.
def get_date_and_hour(date: str) -> datetime:
parsed = parser.isoparse(date["utc"])
return parsed.replace(minute=0, second=0)
get_date_and_hour_udf = udf(parse_datetime, TimestampType())
def print_n_records_for_city(city: str, n: int) -> None:
results = (
smog_5_days_df
.select("city", "date", "parameter", "value")
.na.drop()
.withColumn("datetime", get_date_and_hour_udf("date"))
.where(
(F.col("city") == city) &
(F.col("parameter") == "pm25")
)
.groupBy("datetime")
.agg(F.max("value").alias("max_pm25_value"))
.orderBy("datetime")
)
results.show(n)
print_n_records_for_city("Warszawa", 20)
+-------------------+--------------+ | datetime|max_pm25_value| +-------------------+--------------+ |2021-11-01 01:00:00| 31.85| |2021-11-01 02:00:00| 32.32| |2021-11-01 03:00:00| 28.41| |2021-11-01 04:00:00| 27.47| |2021-11-01 05:00:00| 28.06| |2021-11-01 06:00:00| 30.87| |2021-11-01 07:00:00| 26.1| |2021-11-01 08:00:00| 28.43| |2021-11-01 09:00:00| 22.86| |2021-11-01 10:00:00| 25.75| |2021-11-01 11:00:00| 20.55| |2021-11-01 12:00:00| 25.02| |2021-11-01 13:00:00| 17.82| |2021-11-01 14:00:00| 16.94| |2021-11-01 15:00:00| 21.15| |2021-11-01 16:00:00| 27.81| |2021-11-01 17:00:00| 23.61| |2021-11-01 18:00:00| 25.34| |2021-11-01 19:00:00| 28.17| |2021-11-01 20:00:00| 27.75| +-------------------+--------------+ only showing top 20 rows
Calculate average of PM2.5 pollution for highest $N$ measurements and order cities by this value. Use only cities which have names from small and capital letters from the Latin alphabet.
def get_most_polluted_cities(n: int):
window = Window.partitionBy("city").orderBy("value")
df = (
smog_5_days_df
.select("city", "parameter", "value")
.na.drop()
.where(
(F.col("parameter") == "pm25") &
(F.col("city").rlike("^[a-zA-Z]+$"))
)
.withColumn("rank", F.rank().over(window))
)
df = (
df
.select("city", "value")
.where(F.col("rank") <= n)
.groupBy("city")
.agg(F.avg("value").alias(f"avg_pm25_top_{n}_measurements"))
.orderBy(F.desc(f"avg_pm25_top_{n}_measurements"))
)
return df
get_most_polluted_cities(10).show(10)
+-----------+----------------------------+ | city|avg_pm25_top_10_measurements| +-----------+----------------------------+ | Baghpat| 96.6| | Ghaziabad| 82.33333333333333| | Hapur| 77.8| |Bulandshahr| 76.4| | Noida| 75.09545454545456| | Kaithal| 73.976| | Rohtak| 65.37| | Meerut| 63.0| | Mandikhera| 60.67| |Bahadurgarh| 59.395| +-----------+----------------------------+ only showing top 10 rows
Create visualization to show changes in average pollution over a few days for selected countries.
def parse_date(date: str) -> datetime:
parsed = parser.isoparse(date["utc"])
return parsed.date()
parse_date_udf = udf(parse_date, DateType())
def get_country_heatmap(country: str):
df = (
smog_5_days_df
.select("coordinates", "value", "date")
.na.drop()
.where(
(F.col("parameter") == "pm25") &
(F.col("country") == country)
)
.withColumn("date", parse_date_udf("date"))
)
df = (
df
.groupBy("coordinates", "date")
.agg(F.avg("value").alias("avg_value"))
.orderBy("date", "coordinates")
.select("date", "coordinates.latitude", "coordinates.longitude", "avg_value")
)
return df
%%spark -o pdf
pdf = get_country_heatmap("PL")
%%local
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.figure_factory as ff
px.set_mapbox_access_token("pk.eyJ1Ijoib3Jlb3JlIiwiYSI6ImNrdndwNm13aTBhZzYyb2xjcng4dGdnNWEifQ.LADFJcYpNwTupseSVwA72g")
pdf = pdf.astype({"date": str})
fig = ff.create_hexbin_mapbox(
data_frame=pdf,
lat="latitude",
lon="longitude",
nx_hexagon=8,
opacity=0.5,
color="avg_value",
labels={"color": "Avg value"},
min_count=1,
animation_frame="date",
agg_func=np.mean
)
fig.show()
from time import time
smog_df = spark.read.json("s3a://openaq-fetches/realtime/2021-09-*/*")
smog_month_df = smog_df.where(
(F.col("date.utc") >= F.lit("2021-09-01T00:00:000Z")) &
(F.col("date.utc") < F.lit("2021-10-01T00:00:000Z"))
)
start = time()
berlin_max = (
smog_month_df
.where((F.col("city") == "Berlin") &
(F.col("parameter") == "pm10"))
.select(F.max(F.col("value")))
.first()
)[0]
cities = (
smog_month_df
.select("city", "value")
.where(
(F.col("country") == "PL") &
(F.col("parameter") == "pm10")
)
.groupBy("city")
.agg(F.avg("value").alias("avg_value"))
.where(F.col("avg_value") < berlin_max)
.orderBy(F.desc("avg_value"))
)
end = time()
print(end - start)
79.70714592933655
workers = np.arange(1, 8)
times = np.array([570, 294, 191, 135, 113, 107, 80])
speedups = times[0] / times
efficiencies = speedups / workers
plt.scatter(workers, times)
plt.xlabel("Number of workers")
plt.ylabel("Execution time")
plt.title("Execution time vs number of workers")
plt.show()
%matplot plt
plt.clf()
plt.scatter(workers, speedups)
plt.xlabel("Number of workers")
plt.ylabel("Speedup")
plt.title("Speedup vs number of workers")
plt.show()
%matplot plt
plt.clf()
plt.scatter(workers, efficiencies)
plt.xlabel("Number of workers")
plt.ylabel("Efficiency")
plt.title("Efficiency vs number of workers")
plt.show()
%matplot plt
Perform some calculation on the whole 2020 data. Show examples of Spark UI reports about it, e.g. DAG, Gantt diagram, data size information.
smog_2020_df = spark.read.json("s3a://openaq-fetches/realtime/2020-*-*/*")
smog_2020_df = smog_2020_df.where(
(F.col("date.utc") >= F.lit("2020-01-01T00:00:000Z")) &
(F.col("date.utc") < F.lit("2021-01-01T00:00:000Z"))
)
poland_pm10_max = (
smog_2020_df
.where((F.col("country") == "PL") &
(F.col("parameter") == "pm10"))
.select(F.max(F.col("value")))
.first()
)